package com.corn.flink.lesson3;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

/**
 * RedisSinkOperator
 *
 * @author JimWu
 * @date 2023/2/28 17:47
 **/
public class FlinkRedisSinkOperatorDemo {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Person> ds = env.fromElements(
                new Person("alice", 15, "cq"),
                new Person("jack", 18, "cq"),
                new Person("rose", 20, "sh"),
                new Person("mick", 42, "gz"),
                new Person("white", 12, "cq"),
                new Person("maria", 32, "sz"),
                new Person("candy", 56, "bj"),
                new Person("jerry", 28, "wh")
        );

        FlinkJedisPoolConfig flinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder()
                .setHost("localhost")
                .setPort(6379)
                .build();
        ds.addSink(new RedisSink<>(flinkJedisPoolConfig, new RedisMapper<Person>() {
            @Override
            public RedisCommandDescription getCommandDescription() {
                return new RedisCommandDescription(RedisCommand.HSET, "city_person_data");
            }

            @Override
            public String getKeyFromData(Person data) {
                return data.getAddress();
            }

            @Override
            public String getValueFromData(Person data) {
                return data.toString();
            }
        }));


        env.execute();
    }
}
